package com.elastic.search.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/**
 * mysql 数据解析关注的表，Perl正则表达式.
 * 多个正则之间以逗号(,)分隔，转义符需要双斜杠(\\)
 * 常见例子：
 * 1.  所有表：.*   or  .*\\..*
 * 2.  canal schema下所有表： canal\\..*
 * 3.  canal下的以canal打头的表：canal\\.canal.*
 * 4.  canal schema下的一张表：canal.test1
 * 5.  多个规则组合使用：canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
 * 注意：此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql，所以无法准确提取tableName进行过滤)
 * connector.subscribe(".*\\..*");
 * <p>
 * canal
 *
 * @author huangdeyao
 * @date 2019/6/11 12:16
 */
@Component
public class CanalClient implements DisposableBean {

    private static final Logger logger = LoggerFactory.getLogger(CanalClient.class);

    private CanalConnector canalConnector;

    @Value("${canal.host}")
    private String canalHost;
    @Value("${canal.port}")
    private String canalPort;
    @Value("${canal.destination}")
    private String canalDestination;
    @Value("${canal.username}")
    private String canalUsername;
    @Value("${canal.password}")
    private String canalPassword;


    @Bean
    public CanalConnector getCanalConnector() {
        canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(new InetSocketAddress(canalHost, Integer.valueOf(canalPort))), canalDestination, canalUsername, canalPassword);
        canalConnector.connect();
        // 指定filter，格式 {database}.{table}，这里不做过滤，过滤操作留给用户
        canalConnector.subscribe("vue.sys_user,vue.article");
        // 回滚寻找上次中断的位置
        canalConnector.rollback();
        logger.info("canal客户端启动成功");
        return canalConnector;
    }

    @Override
    public void destroy() throws Exception {
        if (canalConnector != null) {
            canalConnector.disconnect();
        }
    }
}
